﻿using Confluent.Kafka;
using Confluent.Kafka.Admin;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleKafka
{
    class Program
    {
        static async Task Main(string[] args)
        {
            Console.WriteLine("Hello World!");
            {
                //var config2 = new AdminClientConfig() { BootstrapServers = "" };

                //using var adminClient = new AdminClientBuilder(config2).Build();

                //adminClient.CreateTopicsAsync(new List<TopicSpecification>
                //    {
                //         new TopicSpecification{
                //              Name="test3"
                //         }
                //    }).GetAwaiter().GetResult();
            }

            {
                var config2 = new ProducerConfig()
                {
                    BootstrapServers = "",
                    QueueBufferingMaxMessages = 10,
                    MessageTimeoutMs = 5000,
                    RequestTimeoutMs = 3000,
                };

                using (var producer = new ProducerBuilder<string, byte[]>(config2).Build())
                {
                    var e = await producer.ProduceAsync("test", new Message<string, byte[]>
                    {
                        Value = Encoding.UTF8.GetBytes("testsss"),

                    });
                }
            }
            {
                var config = new ConsumerConfig
                {
                    BootstrapServers = "",
                    GroupId = "csharp-consumer",
                    EnableAutoCommit = false,
                    AutoOffsetReset = AutoOffsetReset.Earliest,
                };

                using (var consumer = new ConsumerBuilder<Ignore, string>(config)
                  // Note: All handlers are called on the main .Consume thread.
                  .SetErrorHandler((_, e) =>
                  {
                      Console.WriteLine($"Error: {e.Reason}");
                  })
                  .Build())
                {
                    consumer.Subscribe(new List<string> { "test", "sample.console.showtime" });

                    CancellationTokenSource cts = new CancellationTokenSource();
                    try
                    {
                        while (true)
                        {
                            try
                            {
                                var consumeResult = consumer.Consume(cts.Token);

                                if (consumeResult.IsPartitionEOF)
                                {
                                    Console.WriteLine(
                                        $"Reached end of topic {consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");

                                    continue;
                                }

                                Console.WriteLine($"Received message at {consumeResult.TopicPartitionOffset}: {consumeResult.Message.Value}");

                                if (consumeResult.Offset % 5 == 0)
                                {
                                    // The Commit method sends a "commit offsets" request to the Kafka
                                    // cluster and synchronously waits for the response. This is very
                                    // slow compared to the rate at which the consumer is capable of
                                    // consuming messages. A high performance application will typically
                                    // commit offsets relatively infrequently and be designed handle
                                    // duplicate messages in the event of failure.
                                    try
                                    {
                                        consumer.Commit(consumeResult);
                                    }
                                    catch (KafkaException e)
                                    {
                                        Console.WriteLine($"Commit error: {e.Error.Reason}");
                                    }
                                }
                            }
                            catch (ConsumeException e)
                            {
                                Console.WriteLine($"Consume error: {e.Error.Reason}");
                            }
                        }
                    }
                    catch (OperationCanceledException)
                    {
                        Console.WriteLine("Closing consumer.");
                        consumer.Close();
                    }
                }

            }
            Console.ReadKey();
        }
    }
}
